fix(no-rows-updated): fix no rows updated (#12530)

This commit is contained in:
david-leifker 2025-02-05 08:36:34 -06:00 committed by GitHub
parent 32b654ce20
commit 66bce0d9f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 118 additions and 9 deletions

View File

@ -1031,7 +1031,17 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
if (!upsertResults.isEmpty()) {
// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
try {
txContext.commitAndContinue();
} catch (EntityNotFoundException e) {
if (e.getMessage() != null
&& e.getMessage().contains("No rows updated")) {
log.debug("Ignoring no rows updated condition for metadata update", e);
MetricUtils.counter(EntityServiceImpl.class, "no_rows_updated").inc();
return TransactionResult.rollback();
}
throw e;
}
}
// Retention optimization and tx

View File

@ -3,8 +3,21 @@ package com.linkedin.metadata.entity;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.TX_ISOLATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@ -41,6 +54,7 @@ import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.ebean.Database;
import io.ebean.Transaction;
import io.ebean.TxScope;
import jakarta.persistence.EntityNotFoundException;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.time.Instant;
@ -50,10 +64,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Triple;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -112,14 +127,98 @@ public class EbeanEntityServiceTest
null);
}
/**
* Ideally, all tests would be in the base class, so they're reused between all implementations.
* When that's the case - test runner will ignore this class (and its base!) so we keep this dummy
* test to make sure this class will always be discovered.
*/
@Test
public void obligatoryTest() throws AssertionError {
Assert.assertTrue(true);
public void testNoRowsUpdatedErrorHandling() throws Exception {
// Setup test data
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:testUser");
SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
CorpUserInfo writeAspect = AspectGenerationUtils.createCorpUserInfo("email@test.com");
String aspectName = PegasusUtils.getAspectNameFromSchema(writeAspect.schema());
// Create database and spy on aspectDao
Database server = EbeanTestUtils.createTestServer(EbeanEntityServiceTest.class.getSimpleName());
EbeanAspectDao aspectDao = spy(new EbeanAspectDao(server, EbeanConfiguration.testDefault));
// Prevent actual saves
doNothing().when(aspectDao).saveAspect(any(), any(), anyBoolean());
doReturn(0L)
.when(aspectDao)
.saveLatestAspect(
any(),
anyString(),
anyString(),
any(),
any(),
any(),
any(),
any(),
anyString(),
anyString(),
any(),
any(),
any(),
anyLong());
// Create spied transaction context that throws on commitAndContinue
AtomicReference<TransactionContext> capturedTxContext = new AtomicReference<>();
AtomicReference<TransactionResult<?>> capturedResult = new AtomicReference<>();
doAnswer(
invocation -> {
Function<TransactionContext, TransactionResult<?>> block = invocation.getArgument(0);
Integer maxTransactionRetry = invocation.getArgument(2);
TransactionContext txContext = spy(TransactionContext.empty(maxTransactionRetry));
capturedTxContext.set(txContext);
doThrow(new EntityNotFoundException("No rows updated"))
.when(txContext)
.commitAndContinue();
TransactionResult<?> result = block.apply(txContext);
capturedResult.set(result);
return result.getResults();
})
.when(aspectDao)
.runInTransactionWithRetry(any(), any(), anyInt());
// Create the service with our spied dao
PreProcessHooks preProcessHooks = new PreProcessHooks();
preProcessHooks.setUiEnabled(false);
EntityServiceImpl entityService =
new EntityServiceImpl(aspectDao, _mockProducer, false, preProcessHooks, true);
// Create the test batch
List<ChangeItemImpl> items =
List.of(
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(aspectName)
.recordTemplate(writeAspect)
.systemMetadata(systemMetadata)
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyActiveUsersAspectRetriever(null)));
AspectsBatchImpl batch =
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext())
.items(items)
.build();
// Execute the test
List<UpdateAspectResult> results = entityService.ingestAspects(opContext, batch, false, true);
// Verify results
assertEquals(results.size(), 0, "Expected no results for rolled back transaction");
// Verify transaction behavior
verify(aspectDao).runInTransactionWithRetry(any(), eq(batch), anyInt());
verify(capturedTxContext.get()).commitAndContinue();
// Verify the transaction result was a rollback
TransactionResult<?> result = capturedResult.get();
assertNotNull(result, "Expected a transaction result");
assertFalse(result.isCommitOrRollback(), "Expected a rollback result");
}
@Override