fix(platform): patch for entity creation, honor async flag on request (#6504)

This commit is contained in:
RyanHolstien 2022-11-22 13:05:09 -06:00 committed by GitHub
parent 1229c1f60d
commit 848574dbff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 4 deletions

View File

@ -73,9 +73,10 @@ public class AspectUtils {
try {
MetadataChangeProposal proposal = original.copy();
GenericAspect genericAspect = GenericRecordUtils.serializeAspect(aspect);
// Set UPSERT changetype here as additional changes being added should always be
// done in UPSERT mode even for patches
// proposal.setChangeType(ChangeType.UPSERT);
// Additional changes should never be set as PATCH, if a PATCH is coming across it should be an UPSERT
if (ChangeType.PATCH.equals(proposal.getChangeType())) {
proposal.setChangeType(ChangeType.UPSERT);
}
proposal.setAspect(genericAspect);
proposal.setAspectName(aspectName);
return proposal;

View File

@ -0,0 +1,59 @@
package com.linkedin.metadata;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.AspectUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.TestEntityRegistry;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import io.ebean.EbeanServer;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
public class AspectUtilsTest {
protected final EntityRegistry _snapshotEntityRegistry = new TestEntityRegistry();
protected final EntityRegistry _configEntityRegistry =
new ConfigEntityRegistry(Snapshot.class.getClassLoader().getResourceAsStream("entity-registry.yml"));
protected final EntityRegistry _testEntityRegistry =
new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry);
public AspectUtilsTest() throws EntityRegistryException {
}
@Test
public void testAdditionalChanges() {
EbeanServer server = EbeanTestUtils.createTestServer();
EbeanAspectDao aspectDao = new EbeanAspectDao(server);
aspectDao.setConnectionValidated(true);
EventProducer mockProducer = mock(EventProducer.class);
EntityService entityService = new EntityService(aspectDao, mockProducer, _testEntityRegistry);
MetadataChangeProposal proposal1 = new MetadataChangeProposal();
proposal1.setEntityUrn(new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD));
proposal1.setAspectName("datasetProperties");
DatasetProperties datasetProperties = new DatasetProperties().setName("name");
proposal1.setAspect(GenericRecordUtils.serializeAspect(datasetProperties));
proposal1.setEntityType("dataset");
proposal1.setChangeType(ChangeType.PATCH);
List<MetadataChangeProposal> proposalList = AspectUtils.getAdditionalChanges(proposal1, entityService);
Assert.assertEquals(proposalList.size(), 3);
Assert.assertEquals(proposalList.get(0).getChangeType(), ChangeType.UPSERT);
}
}

View File

@ -628,7 +628,7 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal,
@Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException {
final AspectsDoIngestProposalRequestBuilder requestBuilder =
ASPECTS_REQUEST_BUILDERS.actionIngestProposal().proposalParam(metadataChangeProposal);
ASPECTS_REQUEST_BUILDERS.actionIngestProposal().proposalParam(metadataChangeProposal).asyncParam(String.valueOf(async));
return sendClientRequest(requestBuilder, authentication).getEntity();
}